{
  "metadata" : {
    "name" : "AnomalyDetectionDemo",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : "/tmp/repo",
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : {
      "spark.app.name" : "AnomalyDetectionDemo",
      "spark.master" : "local[*]",
      "spark.driver.memory" : "10G",
      "spark.serializer.extraDebugInfo" : "true"
    }
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#Spark Use Case for Anomaly Detection"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Anomaly Detection with Apache Spark - inspired by Sean Owen's presentation https://www.youtube.com/watch?v=TC5cKYBZAeI\n\n#### Data available from: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## Set up"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Get data"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### The unlabeled testdata "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import sys.process._\n\"rm -f /tmp/kddcup.data /tmp/kddcup.data.json /tmp/kddcup.testdata.unlabeled /tmp/kddcup.testdata.unlabeled.gz /tmp/kddcup.data.gz\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val file = \"/tmp/kddcup.testdata.unlabeled\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val dataUrl = \"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.testdata.unlabeled.gz\"\n\nval gz = \"/tmp/kddcup.testdata.unlabeled.gz\"\ns\"wget $dataUrl -O $gz\"!!\n\ns\"gunzip $gz\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### The labeled dataset "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val origfile = \"/tmp/kddcup.data\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import sys.process._\nval dataUrl = \"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz\"\n\nval gz = \"/tmp/kddcup.data.gz\"\ns\"wget $dataUrl -O $gz\"!!\n\ns\"gunzip $gz\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Create json version of the data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val jsonFields = List(\"duration\", \"protocol_type\", \"service\", \"flag\", \"src_bytes\", \"dst_bytes\", \"land\", \"wrong_fragment\", \"urgent\", \"hot\", \"num_failed_logins\", \"logged_in\", \"num_compromised\", \"root_shell\", \"su_attempted\", \"num_root\", \"num_file_creations\", \"num_shells\", \"num_access_files\", \"num_outbound_cmds\", \"is_host_login\", \"is_guest_login\", \"count\", \"srv_count\", \"serror_rate\", \"srv_serror_rate\", \"rerror_rate\", \"srv_rerror_rate\", \"same_srv_rate\", \"diff_srv_rate\", \"srv_diff_host_rate\", \"dst_host_count\", \"dst_host_srv_count\", \"dst_host_same_srv_rate\", \"dst_host_diff_srv_rate\", \"dst_host_same_src_port_rate\", \"dst_host_srv_diff_host_rate\", \"dst_host_serror_rate\", \"dst_host_srv_serror_rate\", \"dst_host_rerror_rate\", \"dst_host_srv_rerror_rate\", \"label\")\nval jsonFile = origfile + \".json\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val json = scala.io.Source.fromFile(origfile).getLines.map { line => \n  jsonFields.zip(line.split(\",\"))\n            .toMap\n            .map { case (k, v) => s\"\"\" \"$k\": \"$v\" \"\"\"}\n            .mkString(\"{\", \",\", \"}\")\n}\nval w = new java.io.FileWriter(new java.io.File(jsonFile))\njson.foreach { j => w.write(j + \"\\n\") }\nw.close",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Start the server externally on testdata"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : true,
      "output_stream_collapsed" : true,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":markdown\n```\ncd conf/demo\nsbt \"runMain notebook.demo.LineStreamer $file 9999\"\n```",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### All the imports"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.sql.SQLContext\nimport org.apache.spark.mllib.clustering._\nimport org.apache.spark.mllib.linalg._\nimport org.apache.spark.mllib.feature.StandardScaler\nimport org.apache.spark.mllib.util.MLUtils\nimport org.apache.spark.rdd._\nimport org.apache.spark.sql.Row\nimport org.apache.spark.streaming._\nimport org.apache.spark.Logging\nimport org.apache.log4j.{Level, Logger}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "***\n## Reading in and exploring the data -- Spark SQL (and DataFrame)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sqlContext = new SQLContext(sparkContext)\n//-rw-r--r--  1 radek  staff   4.3G 15 May 23:44 kddcup.data.json.big\nval dataFrame = sqlContext.jsonFile(jsonFile).cache",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### We'll be using the modified (json) training dataset from the competition"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Here is an example of the data:"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "output_stream_collapsed" : true,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "\"head -n 1 /tmp/kddcup.data.json\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "dataFrame.printSchema",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### There are nearly 5 million records"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "dataFrame.count",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Let's look at the labels"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val labelsCount = dataFrame.groupBy(\"label\").count().collect",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "labelsCount.toList.map( row => (row.getString(0), row.getLong(1)))",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### For simplicity, selecting only non-numeric columns"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val nonNumericFrame = List(\"protocol_type\" ,\"service\", \"flag\")\nval labeledNumericFrame = dataFrame.select(\n  \"label\", \n  \"duration\",\n  \"src_bytes\",\n  \"dst_bytes\",\n  \"land\",\n  \"wrong_fragment\",\n  \"urgent\",\n  \"hot\",\n  \"num_failed_logins\",\n  \"logged_in\",\n  \"num_compromised\",\n  \"root_shell\",\n  \"su_attempted\",\n  \"num_root\",\n  \"num_file_creations\",\n  \"num_shells\",\n  \"num_access_files\",\n  \"num_outbound_cmds\",\n  \"is_host_login\",\n  \"is_guest_login\",\n  \"count\",\n  \"srv_count\",\n  \"serror_rate\",\n  \"srv_serror_rate\",\n  \"rerror_rate\",\n  \"srv_rerror_rate\",\n  \"same_srv_rate\",\n  \"diff_srv_rate\",\n  \"srv_diff_host_rate\",\n  \"dst_host_count\",\n  \"dst_host_srv_count\",\n  \"dst_host_same_srv_rate\",\n  \"dst_host_diff_srv_rate\",\n  \"dst_host_same_src_port_rate\",\n  \"dst_host_srv_diff_host_rate\",\n  \"dst_host_serror_rate\",\n  \"dst_host_srv_serror_rate\",\n  \"dst_host_rerror_rate\",\n  \"dst_host_srv_rerror_rate\"\n)\n\nlabeledNumericFrame.take(1)(0)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Running standard SQL queries against your dataset"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "dataFrame.registerTempTable(\"logs\")\nval allColumns = sqlContext.sql(\"SELECT * FROM logs WHERE protocol_type = 'udp' LIMIT 1\").first",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "***\n## Build a clustering model -- Spark MLlib"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Prepare the data"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Every row becomes (Label, Vector[numeric values])"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val labeledPoint = labeledNumericFrame.map(row => \n      (row.getString(0), Vectors.dense(row.toSeq.tail.map(s => if(s == null) 0.0 else s.toString.toDouble).toArray)))\n\nval rawData = labeledPoint.map(_._2)\nrawData.first",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Labels are not going to be used when building the model"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Scale the features and cache the results"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val scaler = new StandardScaler().fit(rawData)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val data = scaler.transform(rawData).cache",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "data.first.toArray.toList",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Use K-Means clustering"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val numIterations = 10 //in production it should be more\nval K = 150",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val clusters = KMeans.train(data, K, numIterations)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Meantime let's have a look at Spark UI"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Now we have our model, let's apply it to the data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// this is a hack to workaround the serialization problems occuring while serializing a anonymous function\n// we rescope locally everything used by the function to be serialized\n// we define the function using the instances\n// we launch the computations withing the safe scope\n@transient val ser = new java.io.Serializable {\n  val lp = labeledPoint\n  val cs = clusters\n  val sc = scaler\n  val f = (x:(String, org.apache.spark.mllib.linalg.Vector)) => (cs.predict(sc.transform(x._2)), x._1)\n  val predictions = lp.map(x => f(x))\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "ser.predictions",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### And let's see the clusters and their size"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val clustersWithSize = ser.predictions.map(x => (x._1, 1)).reduceByKey((x,y) => x+y)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "clustersWithSize.take(25).toList",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "##Use case assumption:\n**All the clusters with only one point in them labeled 'normal' are fishy**"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val clustersWithCountAndLabel = clustersWithSize.join(ser.predictions).distinct\nclustersWithCountAndLabel.take(20).toList",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "//clusters with 1 point and labeled as normal\nval suspectedAnomalousClusters = clustersWithCountAndLabel.filter(x => x._2._1 == 1 && x._2._2 == \"normal.\").map( x => x._1 )",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Now we have discovered the anomalous clusters"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val anomalousClusters = suspectedAnomalousClusters.collect\nanomalousClusters.toList",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "***\n## Listen to the stream of events and predict anomaly -- Spark streaming"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### To create the real-time stream we will use the test dataset from the competition (in CSV format)"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "<code>0,tcp,private,REJ,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,107,6,0.00,0.00,1.00,1.00,\n0.06,0.07,0.00,255,6,0.02,0.05,0.00,0.00,0.00,0.00,1.00,1.00</code>\n"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### We will use a simple Java app that reads in the logs and sends them to a TCP socket"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Create streaming context with batch 2s "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val ssc = new StreamingContext(sparkContext, Seconds(2))",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Turn down the logging"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "Logger.getRootLogger.setLevel(Level.ERROR)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Helper method for removing non-numeric columns"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "def extractNumericColumns(r: RDD[Array[String]]): RDD[Vector] = {\n  val nonNumericColumns = List(1, 2, 3)\n  r.map(s => Vectors.dense(s.filterNot(f => nonNumericColumns.contains(s.indexOf(f))).\n                           map(st => if(st == null) 0.0 else st.toDouble).toArray))\n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Helper method for finding the anomalies"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "def findAnomaly(r: RDD[Int], anomalousClusters: Array[Int]): RDD[String] = {\n  r.filter(x => anomalousClusters.contains(x)).map(x => \"Suspected anomaly - entry in cluster \" + x)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "###  Start listening to the stream"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val lines = ssc.socketTextStream(\"localhost\", 9999)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "@transient val safeScope = new java.io.Serializable {\n  val cs = clusters\n  val sc = scaler\n  val ac = anomalousClusters\n  val compute = lines .map(x => x.split(\",\"))\n                      .transform(y => extractNumericColumns(y))\n                      .transform(x => cs.predict(sc.transform(x)))\n                      .transform(x => findAnomaly(x, ac)).print\n  \n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "# Hackers and fraudsters beware!"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "ssc.start()\nssc.awaitTermination()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "markdown",
    "source" : "#### Spark UI now is showing Streaming tab"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "ssc.stop()",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}